Skip to content

[improve][broker] PIP-483: scalable topic auto split/merge#25980

Merged
merlimat merged 14 commits into
apache:masterfrom
merlimat:st-autoscale-evaluator
Jun 12, 2026
Merged

[improve][broker] PIP-483: scalable topic auto split/merge#25980
merlimat merged 14 commits into
apache:masterfrom
merlimat:st-autoscale-evaluator

Conversation

@merlimat

@merlimat merlimat commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Implements PIP-483: Scalable Topic Auto Split/Merge (sub-PIP of PIP-468). The controller now scales a scalable topic's segment count automatically, default-on cluster-wide, bounded by hard caps and asymmetric cooldowns.

End-to-end flow: each broker reports per-segment load to the metadata store → the controller leader reads load + consumer counts → a pure decision function decides → it dispatches to the existing split/merge protocols.

Increments (one commit each, each independently tested)

  1. Decision coreAutoScalePolicyEvaluator.decide(...), a pure I/O-free function: split pass (consumer-count then traffic, gated by splitCooldown + maxSegments) then merge pass (cold adjacent pair, gated by mergeCooldown + mergeWindow + minSegments + maxDagDepth). Plus SegmentLoadStats, AutoScaleConfig, AutoScaleDecision, SegmentLoadSample, and SegmentLayout.mergeDepth().
  2. Broker config + resolver — 17 scalableTopic* knobs (auto-scale on, maxSegments=64, maxDagDepth=10, splitCooldown=60s, mergeCooldown/Window=300s, four split + four merge rate thresholds, etc.) and AutoScaleConfig.fromBrokerConfig().
  3. Load record + reporterScalableTopicResources load get/put/delete paths and SegmentLoadReporter, which writes a sample only when a rate changed materially (default ±25%) or crossed zero, keeping metadata write volume bounded.
  4. Controller wiring — periodic AutoScaleTick (traffic) + event-driven evaluation on stream/checkpoint consumer registration (consumer-count scale-up within seconds, no polling). Serialized by an in-flight guard; splitSegment/mergeSegments set the cooldowns so manual ops count too.
  5. Broker sweepBrokerService periodically sweeps the segment:// topics it hosts, reads their in/out msg+byte rates, and feeds the reporter. This populates the records the controller reads.

Design highlights

  • Splits are fast, merges are lazy. Splits fire as soon as conditions hold (only splitCooldown coalesces bursts); merges require a durable cold window and a longer cooldown.
  • Max DAG depth caps merges, not splits — bounds split↔merge flip-flopping while never blocking a throughput-driven split.
  • The controller reacts, it doesn't poll — load is pushed to metadata on material change; the controller reads it. No cross-broker RPC fan-out.

Test plan

  • AutoScalePolicyEvaluatorTest (20 cases) — every split/merge rule, caps, cooldowns, hysteresis, adjacency, max-depth.
  • AutoScaleConfigTest, SegmentLoadReporterTest, SegmentLayoutTest (mergeDepth).
  • ScalableTopicControllerAutoScaleTest — load-driven split, consumer-driven split (event path), cold-pair merge, disabled no-op, cooldown blocks second split (against in-memory store + mocked admin).
  • V5SegmentLoadReporterTest — end-to-end: producing across a scalable topic + running the sweep writes a load record per segment.
  • Full org.apache.pulsar.broker.service.scalable.* suite + checkstyle.

Follow-up (separate PR)

Per-namespace and per-topic AutoScalePolicyOverride + admin.scalableTopics().set/get/removeAutoScalePolicy(...). The feature is fully functional via the cluster config without it.

merlimat added 5 commits June 5, 2026 09:20
First implementation increment for PIP-483: the pure, I/O-free decision logic and its data types. No live broker paths are wired yet.

- SegmentLoadStats: persisted per-segment load record (4 rates).
- AutoScaleConfig: fully-resolved policy (caps, cooldowns, split/merge thresholds), per-field javadoc.
- AutoScaleDecision: sealed Split | Merge | NoAction with reason strings.
- SegmentLoadSample: evaluator input (stats + metadata-store modified time).
- AutoScalePolicyEvaluator.decide(): split pass (consumer-count then traffic, gated by splitCooldown and maxSegments) followed by merge pass (cold adjacent pair, gated by mergeCooldown, mergeWindow, minSegments and maxDagDepth).
- SegmentLayout.mergeDepth(): counts merges in a segment's lineage, used by the depth cap.

Exhaustively unit-tested in AutoScalePolicyEvaluatorTest (18 cases) plus mergeDepth coverage in SegmentLayoutTest.
Second increment for PIP-483: the cluster-wide broker configuration and the resolver that flattens it into the policy the evaluator reads.

- ServiceConfiguration: 17 scalableTopicAutoScale*/threshold/loadReport knobs (all dynamic, CATEGORY_POLICIES) with the PIP defaults (auto-scale on, maxSegments=64, maxDagDepth=10, splitCooldown=60s, mergeCooldown/Window=300s, the four split + four merge rate thresholds, loadReportInterval=10s, rateChangeThreshold=0.25).
- AutoScaleConfig.fromBrokerConfig(): resolves the broker config into AutoScaleConfig. Namespace/topic override layering will graft onto this via toBuilder() in a later increment.
- AutoScaleConfigTest: defaults match the PIP, the split>merge hysteresis invariant holds, and overridden config propagates.
Third increment for PIP-483: the metadata-store path for per-segment load and the materiality-gated writer.

- ScalableTopicResources: SegmentLoadStats cache + reportSegmentLoadAsync (upsert), getSegmentLoadAsync (value + Stat, so the controller can read the modification timestamp for windowing), deleteSegmentLoadAsync (tolerates missing), and segmentLoadPath (.../segments/{id}/load).
- SegmentLoadReporter: writes a sample only when it changed materially since the last write — a rate moved by more than the relative threshold (default 25%) or crossed to/from zero — keeping metadata write volume bounded. forget() drops the cached last-written value when the broker stops owning a segment.

The reporter owns only the materiality decision + last-written cache; sampling live TopicStats and scheduling the sweep are wired in by the broker in a later increment.

Tested: isMaterialChange (relative threshold, zero-crossing, any-metric) plus reportIfChanged against an in-memory store (first-write, immaterial-skip keeps the Stat timestamp put, material-write, forget-forces-rewrite, delete-tolerates-missing).
Fourth increment for PIP-483: the controller leader now drives auto split/merge.

- Two evaluation triggers: a periodic AutoScaleTick (cadence scalableTopicAutoScaleIntervalSeconds, default 60s) for traffic-driven decisions, and an event-driven evaluation fired the moment a stream/checkpoint consumer registers (so consumer-count scale-up reacts within seconds, no polling). Both scheduled on leadership win, cancelled on leader-loss/close, mirroring the GC tick.
- evaluateAndAct(): collects per-subscription consumer counts (managed coordinators only — QUEUE bypass subs aren't tracked here, so they're naturally excluded) and per-segment load samples (value + Stat modified time), runs the pure AutoScalePolicyEvaluator, and dispatches to the existing splitSegment / mergeSegments. An AtomicBoolean serializes evaluation+dispatch so concurrent ticks/consumer events never launch overlapping auto operations.
- splitSegment/mergeSegments set lastSplitAtMs/lastMergeAtMs at entry, so the split/merge cooldowns cover manual operations too.

Tested in ScalableTopicControllerAutoScaleTest against an in-memory store + mocked admin: load-driven split, no-split-under-threshold, disabled-config no-op, cold-pair merge, consumer-driven split (event-driven), and split-cooldown-blocks-second-split.
Fifth increment for PIP-483: the connecting piece that actually populates the load records the controller reads.

BrokerService.startSegmentLoadReporter() starts a dedicated periodic sweep (scalable-segment-load-reporter, cadence scalableTopicLoadReportIntervalSeconds, default 10s) when scalable topics are enabled. Each tick iterates the segment:// topics this broker hosts via forEachTopic, derives the parent topic + segment id from the segment name, reads the topic's in/out msg and byte rates from getStats, and hands them to the SegmentLoadReporter — which writes to the metadata store only on a material change. The monitor is registered for graceful shutdown alongside the other broker monitors.

Tested end-to-end in V5SegmentLoadReporterTest: producing across a 2-segment scalable topic and running the sweep writes a load record for each segment; a plain persistent topic produces none and the sweep tolerates non-segment topics.
@merlimat merlimat marked this pull request as draft June 8, 2026 23:59
@merlimat merlimat marked this pull request as ready for review June 11, 2026 16:39
@merlimat merlimat requested a review from lhotari June 11, 2026 16:39
@lhotari

lhotari commented Jun 11, 2026

Copy link
Copy Markdown
Member

I performed an AI assisted (Claude Fable 5) review and these were the findings. Some might be useful to address.

Findings

  1. [QUALITY] In-flight guard can leak permanently — ScalableTopicController.java:322-341
    If anything between the successful autoScaleInFlight.compareAndSet and the future-chain construction throws synchronously (e.g. a misbehaving getSegmentLoadAsync), the flag is never reset: runAutoScaleSafely catches the Throwable but doesn't release the guard, so auto-scaling silently stops on this topic until a leadership change. Low likelihood (sync throws from async methods violate the codebase rules), but the failure mode is invisible and permanent — the feature dies with no recovery and nothing but a single warn log to show for it. A try/catch that resets the flag would harden it cheaply.

  2. [QUALITY] Load records leak in the metadata store; forget() is dead code
    deleteSegmentLoadAsync and SegmentLoadReporter.forget() have no production callers (tests only). The GC tick prunes segments from the layout and deletes backing topics (ScalableTopicController.java:1126) but leaves the .../segments/{id}/load znode behind forever. Likewise the reporter's lastWritten map grows unboundedly with segment churn on a long-lived broker, and its documented contract — "call when this broker stops owning the segment topic" — is never wired into unload/seal/delete. Both APIs were built for a purpose this PR doesn't fulfill; either wire them in (GC tick for the znode, topic-close path for forget) or note it as a known follow-up.

  3. [QUALITY] Materiality band is anchored at the last-written value, not at the thresholds — confirm intended
    The reporter skips writes when every rate changed <25% relative to the last written value (SegmentLoadReporter.java:95-102), with only a zero-crossing exception. The effect approximates hysteresis but with two quirks: it's path-dependent (a segment at 10,900 msg/s splits if the last record was 5,000, but not if it was 8,800 — same load, opposite outcomes), and the suppression is permanent rather than a delay (a true rate that settles inside (stored, stored × 1.25) never produces a new record, so a segment can run sustained at up to 25% over the split threshold — or sit 25–33% below the merge threshold — indefinitely without action). The impact is bounded by the tunable 0.25 knob and is one-directional (delays action, never causes spurious action), and adding threshold-crossing writes would interact with the merge window (each crossing resets the cold clock — arguably correct, but a behavior change). The PR frames the knob purely as metadata-write economy, so the right ask is: if the blind spot is accepted, document it on scalableTopicLoadReportRateChangeThreshold; if not, the reporter or evaluator needs threshold awareness — complicated by the reporter being deliberately policy-agnostic.

  4. [QUALITY] Topic ownership moves reset the merge window
    reportSegmentLoadAsync uses readModifyUpdateOrCreate(existing -> stats), which writes unconditionally; a newly-owning broker has an empty lastWritten cache, so after every unload/rebalance an identical record is rewritten and the znode mtime — which coldEnough uses as "cold since" — resets. Under frequent rebalancing, merges can be starved cluster-wide, since load-manager shedding is routine in a busy cluster. Returning existing from the RMW lambda when the values are equal would fix it. Relatedly, coldEnough compares the controller's clock.millis() against the metadata store's server-side mtime, so broker↔store clock skew shifts the window — acceptable for a heuristic, worth a code comment.

  5. [QUALITY] No validation of the policy config
    Nothing enforces minSegments ≤ maxSegments, positive cooldowns, or the invariant the AutoScaleConfig javadoc itself states as a requirement — split thresholds strictly above merge thresholds. A threshold configured as 0 makes the evaluator divide by zero: a positive rate scores Infinity (permanent split pressure), while 0/0 is NaN and silently ignored. A validation pass in fromBrokerConfig would be cheap and catches operator error at the right layer.

  6. [QUALITY] Cooldowns don't survive leader failover, and a failed split consumes one
    lastSplitAtMs/lastMergeAtMs are in-memory volatiles that reset to MIN_VALUE on a new leader, so an auto merge can fire immediately after failover even if one ran seconds earlier. Both timestamps are also set before the operation runs (ScalableTopicController.java:557,608), so a split that fails on a metadata conflict still burns the 60 s cooldown. Both are bounded in impact; worth a comment if intentional.

  7. [QUALITY] Several dynamic = true knobs aren't actually dynamic
    scalableTopicLoadReportIntervalSeconds and scalableTopicLoadReportRateChangeThreshold are read once in BrokerService.startSegmentLoadReporter() at broker start; scalableTopicAutoScaleIntervalSeconds once at leader election; and turning scalableTopicAutoScaleEnabled on after it was off at election time has no effect until the next leadership cycle (turning it off works, since evaluateAndAct rechecks every evaluation). The rate thresholds and cooldowns are genuinely dynamic via AutoScaleConfig.fromBrokerConfig per evaluation. Either honor changes for the others or drop their dynamic flag so update-dynamic-config users aren't misled.

  8. [INTENT MISMATCH] "Consumer-count scale-up within seconds" overstates the burst case
    Only the first split fires within seconds of consumer registration; a burst of N consumers scales one split per max(splitCooldown, tick), so going from 1 segment to N takes roughly (N−1)×60 s. A consumer-change event arriving while an evaluation is in-flight is also silently dropped (CAS fails) and only recovered by the next tick. Both behaviors follow from the cooldown design the PR body itself describes — calibrate the headline claim, or consider re-triggering an evaluation after a consumer-driven split completes so a burst converges in one cooldown chain rather than tick-by-tick.

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just check the local Claude Fable 5 review findings and the comments from my actual review. Feel free to mark them resolved without waiting for a new review.

merlimat added 9 commits June 11, 2026 11:32
…ous throw

If anything between the successful compareAndSet and the future-chain construction threw synchronously, the whenComplete callback never existed and the flag stayed set forever — silently disabling auto-scaling on the topic until a leadership change. Wrap the chain construction so the guard is released on a synchronous throw too.
…) on unload

Two leaks from the review:

- The GC tick pruned a segment from the layout and deleted its backing topic but left the .../segments/{id}/load znode behind forever. The prune fan-out now deletes the load record alongside the backing topic (deleteSegmentLoadAsync already tolerates a missing record).
- SegmentLoadReporter.forget() had no production caller, so the per-broker lastWritten cache grew unboundedly with segment churn, and a re-acquired segment's first sample could be wrongly suppressed as immaterial. BrokerService.removeTopicFromCache now drops the cache entry when a segment topic is unloaded/deleted from this broker.

The GC controller test now seeds a load record for the doomed segment and asserts it is gone after the prune tick.
…e terminology

- deleteScalableTopicAsync now uses MetadataStore.deleteRecursive, so deleting a scalable topic removes everything under its record: the controller leader lock, subscription + consumer registrations, and the per-segment load records. This also fixes the pre-existing subscription-record leak on topic delete. Covered by testDeleteScalableTopicCleansUp, which now seeds a subscription and a load record and asserts both are gone.
- Replace 'znode' with store-neutral wording in the javadoc/comments this PR introduced — ZooKeeper is only one metadata backend.
The merge window derives from the load record's Stat modification time, so any rewrite of an effectively-unchanged record restarts the 'cold since' clock. Two layers fixed:

- SegmentLoadReporter.reportIfChanged: on a local cache miss (broker restart or ownership just moved here) the materiality baseline is now seeded from the record already in the store and the ±threshold gate applies against it — instead of writing the first sample unconditionally. Under frequent load-manager rebalancing this was enough to starve merges cluster-wide.
- ScalableTopicResources.reportSegmentLoadAsync: a bit-identical value is no longer rewritten (readModifyUpdateOrCreate always puts, even for an equal value), as a backstop for any caller bypassing the reporter.

Also documents the controller-clock vs store-clock skew on the coldEnough window check. forget() keeps its cache-hygiene role; its javadoc now reflects that a re-acquire re-seeds from the store rather than force-rewriting (test updated accordingly, plus new coverage for the new-owner seeding and the identical-value no-op).
Nothing previously enforced the invariants the evaluator depends on. In particular a split threshold configured as 0 makes the overload score rate/threshold yield Infinity for any positive rate (permanent split pressure) and NaN for a zero rate (silently ignored) — doubles never throw on division by zero, so the misbehavior was invisible.

AutoScaleConfig.validated(), invoked from fromBrokerConfig (and reusable by the future namespace/topic override resolution), now rejects: minSegments < 1, maxSegments < minSegments, negative maxDagDepth/cooldowns/window, non-positive split thresholds, negative merge thresholds, and split thresholds not strictly above their merge counterparts (the hysteresis dead-band the javadoc already required). A violation surfaces as a warn-logged IllegalArgumentException on each evaluation until the operator fixes the config.

Merge thresholds of 0 remain valid as an explicit 'never merge' setting.
… operations

Two cooldown-clock fixes:

- lastSplitAtMs/lastMergeAtMs were set at splitSegment/mergeSegments entry, so an attempt that failed (e.g. segment-topic creation error, CAS conflict) still consumed the cooldown and blocked the retry for the full window. They are now set only after the layout CAS has succeeded.
- The clocks are in-memory and used to reset on every leader failover, allowing e.g. an auto merge seconds after one ran on the previous leader. The new leader now seeds them from the layout itself: a split's children carry exactly one parentId and a merge's child carries two, so the most recent createdAtMs of each class is precisely when the last split/merge happened. No new persistent state.

New tests: split cooldown enforced across a close + re-elect cycle, and a failed split leaving the layout unchanged retries immediately once the injected error clears.
Several knobs were marked dynamic=true but read once:

- scalableTopicAutoScaleEnabled: turning it ON after it was off at leader election had no effect until the next leadership cycle, because the tick was never scheduled. The tick is now scheduled unconditionally (a disabled evaluation is a cheap no-op that re-reads the flag), making enable/disable genuinely dynamic.
- scalableTopicLoadReportRateChangeThreshold: was captured at broker start in the reporter constructor. The reporter now takes a DoubleSupplier and re-reads the config on every sample (the double constructor remains for tests).
- scalableTopicAutoScaleIntervalSeconds and scalableTopicLoadReportIntervalSeconds genuinely are read once (at leadership win / broker start respectively): marked dynamic=false and documented, rather than pretending.

The rate thresholds, cooldowns, windows and caps were already dynamic — AutoScaleConfig.fromBrokerConfig re-resolves them on every evaluation.
…low-ups

Two burst-behavior gaps:

- A trigger arriving while an evaluation was in flight (CAS busy) was silently dropped and only recovered by the next periodic tick — e.g. a consumer registering mid-evaluation. The busy path now sets a re-evaluate flag that the in-flight run honors on completion, so coalesced triggers are never lost.
- After a successful auto split, one follow-up evaluation is scheduled right after the split cooldown expires. A burst of N consumers on one segment now converges at one split per cooldown (the chain stops at the first NoAction decision) instead of one split per periodic tick, which is slower whenever the cooldown is shorter than the tick.

New test: 4 consumers registering on a 1-segment topic converge to 4 segments purely from event-driven evaluations + the follow-up chain, with no periodic tick and no manual evaluation calls.
The load reporter's significant-change gate is anchored at the last written value, not at the split/merge thresholds: a rate that settles within the band of the last record is never re-reported, so a segment can sustain up to the configured factor beyond a threshold without the controller seeing it. The effect is path-dependent but one-directional (delays action, never causes a spurious one) and bounded by the knob.

Accepted as the cost of bounded metadata write volume; now documented on scalableTopicLoadReportRateChangeThreshold and in the SegmentLoadReporter javadoc so operators know to lower the threshold for tighter tracking.
@merlimat merlimat merged commit 20dc8f9 into apache:master Jun 12, 2026
156 of 163 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants